package com.atguigu.realtime.app

import com.alibaba.fastjson.JSON
import com.atguigu.gmall.common.Constant
import com.atguigu.realtime.bean.{OrderInfo, SaleDetail, UserInfo}
import com.atguigu.realtime.util.{ESUtil, MyRedisUtil}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.json4s.jackson.Serialization
import redis.clients.jedis.Jedis

import scala.collection.JavaConverters._


/**
 * Author atguigu
 * Date 2020/10/23 10:09
 */
object SaleDetailApp extends BaseAppV2 {
    override val appName: String = "SaleDetailApp"
    override val master: String = "local[2]"
    override val batchTime: Int = 3
    override val groupId: String = "SaleDetailApp"
    override val topic: Set[String] = Set(Constant.ORDER_DETAIL_TOPIC, Constant.ORDER_INFO_TOPIC)
    
    def fullJoin_1(orderInfoStream: DStream[(String, OrderInfo)],
                   orderDetailStream: DStream[(String, OrderDetail)]) = {
        // 1. 给流加window
        val orderInfoStreamWithWindow: DStream[(String, OrderInfo)] = orderInfoStream.window(Seconds(12), Seconds(3))
        val orderDetailStreamWithWindow: DStream[(String, OrderDetail)] = orderDetailStream.window(Seconds(12), Seconds(3))
        // 2. 真正的join
        val saleDetailStream = orderInfoStreamWithWindow
            .join(orderDetailStreamWithWindow)
            .map {
                case (orderId, (orderInfo, orderDetail)) =>
                    SaleDetail()
                        .mergeOrderInfo(orderInfo)
                        .mergeOrderDetail(orderDetail)
                
            }
            .mapPartitions((saleDetailIt: Iterator[SaleDetail]) => {
                val client: Jedis = MyRedisUtil.getClient
                val result = saleDetailIt.filter(saleDetail => {
                    // 1. set的key: 可以体现时间, 但是会导致key过多, 不好管理
                    1 == client.sadd("gmall0523", saleDetail.order_id + ":" + saleDetail.order_detail_id)
                    
                })
                client.close()
                result
            })
        // 3. 去重
        saleDetailStream.print
    }
    
    /**
     * 把OrderDetail缓存到redis
     *
     * @param client
     * @param orderDetail
     * @return
     */
    def cacheOrderDetail(client: Jedis, orderDetail: OrderDetail) = {
        implicit val f = org.json4s.DefaultFormats
        val json = Serialization.write(orderDetail)
        client.hset("order_detail:" + orderDetail.order_id, orderDetail.id, json)
    }
    
    /**
     * 缓存OrderInfo到redis
     *
     * @param client
     * @param orderInfo
     * @return
     */
    def cacheOrderInfo(client: Jedis, orderInfo: OrderInfo) = {
        implicit val f = org.json4s.DefaultFormats
        val json = Serialization.write(orderInfo)
        client.setex("order_info:" + orderInfo.id, 30 * 60, json)
    }
    
    def fullJoin_2(orderInfoStream: DStream[(String, OrderInfo)],
                   orderDetailStream: DStream[(String, OrderDetail)]) = {
        // DStream[SaleDetail]
        orderInfoStream
            .fullOuterJoin(orderDetailStream)
            .mapPartitions(it => {
                val client: Jedis = MyRedisUtil.getClient
                val result = it.flatMap {
                    // some some
                    case (orderId, (Some(orderInfo), Some(orderDetail))) =>
                        println("some some")
                        // 1. 缓存order_info
                        cacheOrderInfo(client, orderInfo)
                        // 2. orderInfo orderDetail合并成一个saleDetail
                        val saleDetail = SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail)
                        // 3. 去order_detail的缓存中找到这个order对应的所有order_detail
                        if (client.exists("order_detail:" + orderId)) {
                            val t = client
                                .hgetAll("order_detail:" + orderId)
                                .asScala
                                .map {
                                    case (oderDetailId, jsonString) =>
                                        val orderDetail = JSON.parseObject(jsonString, classOf[OrderDetail])
                                        SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail)
                                }
                                .toList :+ saleDetail
                            client.del("order_detail:" + orderId)
                            t
                        } else {
                            saleDetail :: Nil
                        }
                    // some none
                    case (orderId, (Some(orderInfo), None)) =>
                        println("some none")
                        // 1. 缓存order_info
                        cacheOrderInfo(client, orderInfo)
                        // 3. 去order_detail的缓存中找到这个order对应的所有order_detail
                        if (client.exists("order_detail:" + orderId)) {
                            val t = client
                                .hgetAll("order_detail:" + orderId)
                                .asScala
                                .map {
                                    case (oderDetailId, jsonString) =>
                                        val orderDetail = JSON.parseObject(jsonString, classOf[OrderDetail])
                                        SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail)
                                }
                                .toList
                            client.del("order_detail:" + orderId)
                            t
                        } else {
                            Nil
                        }
                    // none some
                    case (orderId, (None, Some(orderDetail))) =>
                        println("none some")
                        // 1. 去orderInfo的缓存中查找对应的orderInfo信息
                        if (client.exists("order_info:" + orderDetail.order_id)) {
                            // 2. 如果找到, 就join, 返回join后的结果
                            val orderInfoString: String = client.get("order_info:" + orderDetail.order_id)
                            val orderInfo = JSON.parseObject(orderInfoString, classOf[OrderInfo])
                            SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail) :: Nil
                        } else {
                            // 3. 如果没有找到, 返回空  null, 并且把orderDetail信息写入到缓存
                            cacheOrderDetail(client, orderDetail)
                            Nil
                        }
                }
                client.close()
                result
            })
        
    }
    
    /**
     * 把用户信息join到saleDetail中
     * 使用spark-sql来从msyql读数据 df/ds
     * 把流和df/ds转成rdd之后进行join
     *
     * @param ssc
     * @param saleDetailStream
     */
    def joinUser(ssc: StreamingContext, saleDetailStream: DStream[SaleDetail]) = {
        val spark = SparkSession.builder()
            .config(ssc.sparkContext.getConf)
            .getOrCreate()
        import spark.implicits._
        
        // 先去读取到user信息
        def readUserInfo(ids: List[String]) = {
            spark
                .read
                .format("jdbc")
                .option("url", "jdbc:mysql://hadoop102:3306/gmall0523?useSSL=false")
                .option("user", "root")
                .option("password", "aaaaaa")
                .option("query", s"select * from user_info where id in ('${ids.mkString("','")}')")
                .load()
                .as[UserInfo]
                .rdd
                .map(userInfo => (userInfo.id, userInfo))
        }
        //
        saleDetailStream.transform((rdd: RDD[SaleDetail]) => {
            // 每个批次一次
            rdd.cache() // rdd后面会使用多次, 所以做缓存
            // 把这个批次中所有的user_id拿出来
            val ids = rdd.map(_.user_id).collect().toSet.toList
            
            val userInfoRdd: RDD[(String, UserInfo)] = readUserInfo(ids)
            rdd
                .map(saleDetail => (saleDetail.user_id, saleDetail))
                .join(userInfoRdd)
                .map {
                    case (userId, (saleDetail, userInfo)) =>
                        saleDetail.mergeUserInfo(userInfo)
                }
        })
        
        
    }
    
    def writeToES(saleDetailWithUserStream: DStream[SaleDetail]): Unit = {
        saleDetailWithUserStream.foreachRDD(rdd => {
            rdd.foreachPartition(sdIt => {
                ESUtil.insertBulk("gmall_sale_detail", sdIt.map(sd => (sd.order_id + "_" + sd.order_detail_id, sd)))
            })
        })
    }
    
    override def run(ssc: StreamingContext, topicAndStream: Map[String, DStream[String]]): Unit = {
        val orderInfoStream: DStream[(String, OrderInfo)] = topicAndStream(Constant.ORDER_INFO_TOPIC)
            .map(json => {
                val oderInfo = JSON.parseObject(json, classOf[OrderInfo])
                (oderInfo.id, oderInfo)
            })
        val orderDetailStream: DStream[(String, OrderDetail)] = topicAndStream(Constant.ORDER_DETAIL_TOPIC)
            .map(json => {
                val orderDetail = JSON.parseObject(json, classOf[OrderDetail])
                (orderDetail.order_id, orderDetail)
            })
        
        
        //1. 对两个流进行full join, 返回值就是join后的流
        //        fullJoin_1(orderInfoStream, orderDetailStream)
        val saleDetailStream = fullJoin_2(orderInfoStream, orderDetailStream)
        // 2. join用户信息
        val saleDetailWithUserStream = joinUser(ssc, saleDetailStream)
        
        // 3. 把数据写入到es
        writeToES(saleDetailWithUserStream)
        
        
    }
}

/*
order_info信息如何缓存:
key                                 value
"order_info:" + order_id            order_info的信息变成json字符串存入
order_info:1                        {"": "", ...}

-------------

order_detail信息如何缓存:(1)
key                                                   value
"order_detail:" + order_id + order_detail_id           json字符串


val keys = keys("order_detail:" + order_id*")

order_detail信息如何缓存:(2)
key                                 value map
"order_detail:" + order_id           field                  value
                                     order_detail_id        json字符串

 */